/*
 * Copyright 2013-16 Board of Trustees of Stanford University
 * Copyright 2013-16 Ecole Polytechnique Federale Lausanne (EPFL)
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in
 * all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 * THE SOFTWARE.
 */

#include <errno.h>
#include <pthread.h>
#include <stdio.h>

#include <ixev.h>
#include <mempool.h>
#include <ix/list.h>

#define ROUND_UP(num, multiple) ((((num) + (multiple) - 1) / (multiple)) * (multiple))


typedef struct _msg_header {
		uint8_t type;
		uint8_t src_ip;
		uint16_t port;
		uint16_t num_msg;
		uint16_t len_msg;
		uint32_t ack;
} msg_header;

typedef enum _header_type {
	S_REQUEST  = 0x60,
	S_RESPONSE = 0x10,
	CLIENT = 0x01,
	SERVER = 0x03,
} header_type;

struct server_conn {
	struct ixev_ctx ctx;
	size_t bytes_left;
	size_t w_offset;
	size_t r_offset;
	size_t p_offset;
	char data[];
};

struct client_conn {
	struct nfv_conn *sconn;
	struct hlist_node link;
	size_t bytes_recvd;
	size_t bytes_sent;
	char data[];
};

struct nfv_conn {
	struct ixev_ctx ctx;
	struct ip_tuple ip;
	uint8_t type;
	unsigned int id;
	char data[];
};

#define MAX_SERVER_CONN 8
#define MAX_CLIENT_CONN 512 /* 0x1FF + 1*/
#define MAX_CPU 32
struct conn_hash_entry {
	struct hlist_head head;
	struct hlist_node hash_link;
};

struct thread_ctx {
	int id;
	unsigned int num_conn_server;
	unsigned int num_conn_client;
	struct nfv_conn * servers[MAX_SERVER_CONN]; /* server array */
	struct hlist_head clients[MAX_CLIENT_CONN]; /* client map: ip <-> ixev_ctx */
};
static __thread struct thread_ctx tctx;

static unsigned int msg_size, nconn;

static struct mempool_datastore server_conn_datastore;
static __thread struct mempool server_conn_pool;

static struct mempool_datastore client_conn_datastore;
static __thread struct mempool client_conn_pool;

static void nfv_main_handler(struct ixev_ctx *ctx, unsigned int reason);

static void nfv_stream_handler(struct ixev_ctx *ctx, unsigned int reason)
{
	struct nfv_conn *conn = container_of(ctx, struct nfv_conn, ctx);
	size_t bytes_so_far = msg_size - conn->bytes_left;
	ssize_t ret;

	ret = ixev_send(ctx, &conn->data[bytes_so_far], conn->bytes_left);
	if (ret < 0) {
		if (ret != -EAGAIN)
			ixev_close(ctx);
		return;
	}

	conn->bytes_left -= ret;
	if (!conn->bytes_left) {
		conn->bytes_left = msg_size;
		ixev_set_handler(ctx, IXEVIN, &nfv_main_handler);
	}
}

inline int
create_header(msg_header *h, header_type type, uint32_t ip_addr, uint16_t port, int buf_len)
{
	uint8_t ip_rank;
	//memset(h, 0, sizeof(msg_header));
	/* check if is a request */

	ip_rank = htonl(ip_addr) & 0xFF;
	if (ip_rank == 0) {
		return ERROR;
	}

	h->type = type;
	h->src_ip = ip_rank;
	h->port = htons(port);
	h->len_msg = htons(buf_len);

	return TRUE;
}

static void nfv_main_handler(struct ixev_ctx *ctx, unsigned int reason)
{
	struct nfv_conn *conn = container_of(ctx, struct nfv_conn, ctx);
	ssize_t ret;
	ssize_t bytes_left;

	if (conn->type == CLIENT) {
		/* batch from client */		
		struct client_conn *client = (struct client_conn *)conn->data;
		struct server_conn *sconn = (struct server_conn *)client->sconn->data;
		struct ixev_ctx *sctx = client->sconn->ctx;

		bytes_left = msg_size - sconn->r_offset;

		if(bytes_left != 0) {
			ret = ixev_recv(sctx, &sconn->data[sconn->r_offset], bytes_left);
			//printf("read_len :%d\n",ret);
			if (ret <= 0) {
				if (ret != -EAGAIN)
					ixev_close(sctx);
				return;
			}
			sconn->r_offset += ret;
		}
	}
	else if (conn->type == SERVER){
		/* extract from server */
	}

	while (1) {
		size_t bytes_so_far = msg_size - conn->bytes_left;

		ret = ixev_recv(ctx, &conn->data[bytes_so_far],
				conn->bytes_left);
		if (ret <= 0) {
			if (ret != -EAGAIN)
				ixev_close(ctx);
			return;
		}

		conn->bytes_left -= ret;
		if (conn->bytes_left)
			return;

		conn->bytes_left = msg_size;
		ret = ixev_send(ctx, &conn->data[0], conn->bytes_left);
		if (ret == -EAGAIN)
			ret = 0;
		if (ret < 0) {
			ixev_close(ctx);
			return;
		}

		conn->bytes_left -= ret;
		if (conn->bytes_left) {
			ixev_set_handler(ctx, IXEVOUT, &nfv_stream_handler);
			return;
		}

		conn->bytes_left = msg_size;
	}
}

static inline void reg_hash_entry(struct nfv_conn *conn)
{
	struct hlist_node *list;
	struct client_conn *client = (struct client_conn *)conn->data;
	unsigned int key;

	key =  conn->ip.dst_port & (MAX_CLIENT_CONN - 1);
	list = &tctx.clients[key];
	hlist_add_head(&client->link,list);
}

struct ixev_ctx *find_client(struct ip_tuple *id)
{
	struct client_conn *client;
	struct nfv_conn *conn;
	unsigned int key;
	struct hlist_node *list, *he;

	key =  id->dst_port & (MAX_CLIENT_CONN - 1);
	list = &tctx.clients[key];

	hlist_for_each(list, he) {
		client = hlist_entry(he, struct client_conn, link);
		conn = container_of(client, struct nfv_conn, data);
		if (id->dst_port == conn->ip.dst_port &&
				id->dst_port == conn->ip.dst_ip)
			return client;
	}
	return NULL;
}

static struct ixev_ctx *nfv_accept(struct ip_tuple *id)
{
	/* NOTE: we accept everything right now, did we want a port? */
	struct client_conn *client;
	struct nfv_conn *conn = mempool_alloc(&client_conn_pool);

	if (!conn)
		return NULL;

	conn->type = CLIENT;
	conn->ip = *id;
	conn->id = tctx.num_conn_client;
	client = (struct client_conn *)conn->data;

	/* find a server connection */
	client->sconn = &tctx.servers[tctx.num_conn_client/tctx.num_conn_server];
	reg_hash_entry(conn);

	ixev_ctx_init(&conn->ctx);
	ixev_set_handler(&conn->ctx, IXEVIN, &nfv_main_handler);

	return &conn->ctx;
}

static void nfv_release(struct ixev_ctx *ctx)
{
	struct nfv_conn *conn = container_of(ctx, struct nfv_conn, ctx);

	mempool_free(&nfv_conn_pool, conn);
}

static void nfv_dialed(struct ixev_ctx *ctx, long ret)
{
	if (ret)
		fprintf(stderr, "failed to connect, ret = %ld\n", ret);

	struct nfv_conn *conn = container_of(ctx, struct client_conn, ctx);
	conn->type = SERVER;
	conn->id = tctx.num_conn_server;

	tctx.servers[tctx.num_conn_server++] = conn;

	printf("thread: %d: new connection %u\n", tctx.id, tctx.num_conn);
	fflush(stdout);

	ixev_set_handler(ctx, IXEVOUT, &nfv_main_handler);
	nfv_main_handler(&c->ctx, IXEVOUT);
}

static struct ixev_conn_ops nfv_conn_ops = {
	.accept		= &nfv_accept,
	.release	= &nfv_release,
	.dialed		= &nfv_dialed,
};

static void *nfv_main(void *arg)
{
	int ret;

	tctx->num_conn = 0;
	tctx.id = *(unsigned int *) arg;

	printf("nfv thread %d started\n", tctx->id);

	ret = ixev_init_thread();
	if (ret) {
		fprintf(stderr, "unable to init IXEV\n");
		return NULL;
	};

	ret = mempool_create(&server_conn_pool, &server_conn_datastore);
	if (ret) {
		fprintf(stderr, "unable to create server mempool\n");
		return NULL;
	}

	ret = mempool_create(&client_conn_pool, &client_conn_datastore);
	if (ret) {
		fprintf(stderr, "unable to create client mempool\n");
		return NULL;
	}

	/* connect to server */
	for (i = 0; i < nconn; i++) {
		//struct client_conn *c = malloc(sizeof(struct client_conn));
		struct server_conn *c = mempool_alloc(&server_conn_pool);
		if (!c) {
			fprintf(stderr, "unable to malloc\n");
			exit(-1);
		}
		c->conn_id = i;
		c->ip.dst_port = dst_port;
		c->ip.dst_ip = dst_ip;
		c->thread_id = tctx->id;
		c->cnt = nrequests;
		ixev_dial(&c->ctx, &c->id);
	}

	while (1) {
		ixev_wait();
	}

	return NULL;
}

int main(int argc, char *argv[])
{
	int i, nr_cpu, opt;
	pthread_t tid;
	int ret;
	unsigned int client_conn_pool_entries;

	client_conn_pool_entries = 16 * 4096;

	while ((opt = getopt(argc, argv, "scn:")) != -1) {
		switch (opt) {
		case 's':
			msg_size = atol(optarg);
			break;
		case 'c':
			nconn = atol(optarg);
			break;
		case 'n':
			client_conn_pool_entries = atoi(optarg);
			break;
		default: /* '?' */
			fprintf(stderr, "Usage: %s -s MSG_SIZE\n\
					-c NUM_CONNECTIONS\n\
					[-n MAX_CONNECTIONS]\n", argv[0]);
			return -1;
		}
	}

	client_conn_pool_entries = ROUND_UP(client_conn_pool_entries, MEMPOOL_DEFAULT_CHUNKSIZE);

	ret = ixev_init(&nfv_conn_ops);
	if (ret) {
		fprintf(stderr, "failed to initialize ixev\n");
		return ret;
	}

	ret = mempool_create_datastore(&server_conn_datastore, MAX_CPU * nconn sizeof(struct nfv_conn) + msg_size, 0, MEMPOOL_DEFAULT_CHUNKSIZE, "server_conn");
	if (ret) {
		fprintf(stderr, "unable to create mempool\n");
		return ret;
	}

	ret = mempool_create_datastore(&client_conn_datastore, client_conn_pool_entries, sizeof(struct client_conn), 0, MEMPOOL_DEFAULT_CHUNKSIZE, "client_conn");
	if (ret) {
		fprintf(stderr, "unable to create mempool\n");
		return ret;
	}

	nr_cpu = sys_nrcpus();
	if (nr_cpu < 1) {
		fprintf(stderr, "got invalid cpu count %d\n", nr_cpu);
		exit(-1);
	}
	nr_cpu--; /* don't count the main thread */

	sys_spawnmode(true);

	for (i = 0; i < nr_cpu; i++) {
		if (pthread_create(&tid, NULL, nfv_main, NULL)) {
			fprintf(stderr, "failed to spawn thread %d\n", i);
			exit(-1);
		}
	}

	nfv_main(NULL);
	return 0;
}

